package com.atuguigu.flink.sensor;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class KeyByExample {
    public static void main(String[] args)throws  Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);


        DataStreamSource<SendsorReading> stream = env.addSource(new SensorSource());


        KeyedStream<SendsorReading, String> keyedStream = stream.keyBy(
                new KeySelector<SendsorReading, String>() {


                    @Override
                    public String getKey(SendsorReading sendsorReading) throws Exception {
                        return sendsorReading.id;
                    }
                }
        );


        SingleOutputStreamOperator<SendsorReading> reduceStream = keyedStream.reduce(
                new ReduceFunction<SendsorReading>() {
                    @Override
                    public SendsorReading reduce(SendsorReading sendsorReading, SendsorReading t1) throws Exception {
                        return new SendsorReading(sendsorReading.id, Math.min(sendsorReading.temperture, t1.temperture), 0L);
                    }
                }
        );

        reduceStream.print();


        env.execute();
    }
}
